{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 09: SparkSQL - File Formats\n",
    "\n",
    "This script demonstrates the methods for reading and writing files in the [Parquet](http://parquet.io) and JSON formats. It reads in the same data as in the previous example, writes it to new files in Parquet format, then reads it back in and runs queries on it. Then it repeats the exercise using JSON. Additional support is now available in Spark for ORC files, as well as old stand-bys like CSV.\n",
    "\n",
    "The key [SparkSession](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession) and [Dataset](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset) methods are `SparkSession.read.parquet(inpath)` and `Dataset.write.save(outpath)` for reading and writing Parquet, and `SparkSession.read.json(inpath)` and `Dataset.write.json(outpath)` for reading and writing JSON. (The format for the first `write.save` method can be overridden to default to a different format.)\n",
    "\n",
    "See the corresponding \"script\" suitable for _spark-shell_, [SparkSQLFileFormats9-script.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/SparkSQLFileFormats9-script.scala)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "in = ../data/kjvdat.txt\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "../data/kjvdat.txt"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val in = \"../data/kjvdat.txt\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Parse the lines into `Verse` instances as before."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined class Verse\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "case class Verse(book: String, chapter: Int, verse: Int, text: String)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "lineRE = ^\\s*([^|]+)\\s*\\|\\s*([\\d]+)\\s*\\|\\s*([\\d]+)\\s*\\|\\s*(.*)~?\\s*$\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "^\\s*([^|]+)\\s*\\|\\s*([\\d]+)\\s*\\|\\s*([\\d]+)\\s*\\|\\s*(.*)~?\\s*$"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val lineRE = \"\"\"^\\s*([^|]+)\\s*\\|\\s*([\\d]+)\\s*\\|\\s*([\\d]+)\\s*\\|\\s*(.*)~?\\s*$\"\"\".r"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "versesRDD = MapPartitionsRDD[2] at flatMap at <console>:33\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[2] at flatMap at <console>:33"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val versesRDD = sc.textFile(in).flatMap {\n",
    "  case lineRE(book, chapter, verse, text) =>\n",
    "    Seq(Verse(book, chapter.toInt, verse.toInt, text))\n",
    "  case line =>\n",
    "    Console.err.println(s\"Unexpected line: $line\")\n",
    "    Nil // or use Seq.empty[Verse]. It will be eliminated by flattening.\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "verses = [book: string, chapter: int ... 2 more fields]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[book: string, chapter: int ... 2 more fields]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val verses = spark.createDataFrame(versesRDD)\n",
    "verses.createOrReplaceTempView(\"kjv_bible\")\n",
    "verses.cache()  // not really needed in this notebook, but..."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Save as Parquet. (Remember to delete this output if you run this bit again...)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Saving 'verses' as a Parquet file to output/parquet.\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "parquetDir = output/parquet\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "output/parquet"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val parquetDir = \"output/parquet\"\n",
    "println(s\"Saving 'verses' as a Parquet file to $parquetDir.\")\n",
    "verses.write.parquet(parquetDir)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now read it back in as a new `Dataset`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Reading in the Parquet file from output/parquet:\n",
      "+----+-------+-----+--------------------+\n",
      "|book|chapter|verse|                text|\n",
      "+----+-------+-----+--------------------+\n",
      "| Psa|     68|   11|The Lord gave the...|\n",
      "| Psa|     68|   12|Kings of armies d...|\n",
      "| Psa|     68|   13|Though ye have li...|\n",
      "| Psa|     68|   14|When the Almighty...|\n",
      "| Psa|     68|   15|The hill of God i...|\n",
      "| Psa|     68|   16|Why leap ye, ye h...|\n",
      "| Psa|     68|   17|The chariots of G...|\n",
      "| Psa|     68|   18|Thou hast ascende...|\n",
      "| Psa|     68|   19|Blessed be the Lo...|\n",
      "| Psa|     68|   20|He that is our Go...|\n",
      "| Psa|     68|   21|But God shall wou...|\n",
      "| Psa|     68|   22|The Lord said, I ...|\n",
      "| Psa|     68|   23|That thy foot may...|\n",
      "| Psa|     68|   24|They have seen th...|\n",
      "| Psa|     68|   25|The singers went ...|\n",
      "| Psa|     68|   26|Bless ye God in t...|\n",
      "| Psa|     68|   27|There is little B...|\n",
      "| Psa|     68|   28|Thy God hath comm...|\n",
      "| Psa|     68|   29|Because of thy te...|\n",
      "| Psa|     68|   30|Rebuke the compan...|\n",
      "+----+-------+-----+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "verses2 = [book: string, chapter: int ... 2 more fields]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[book: string, chapter: int ... 2 more fields]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "println(s\"Reading in the Parquet file from $parquetDir:\")\n",
    "val verses2 = spark.read.parquet(parquetDir)\n",
    "verses2.createOrReplaceTempView(\"verses2\")\n",
    "verses2.show"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run a SQL query..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of Jesus Verses: 936\n",
      "+----+-------+-----+--------------------+\n",
      "|book|chapter|verse|                text|\n",
      "+----+-------+-----+--------------------+\n",
      "| Mat|      1|    1|The book of the g...|\n",
      "| Mat|      1|   16|And Jacob begat J...|\n",
      "| Mat|      1|   18|Now the birth of ...|\n",
      "| Mat|      2|    1|Now when Jesus wa...|\n",
      "| Mat|      3|   13|Then cometh Jesus...|\n",
      "| Mat|      3|   15|And Jesus answeri...|\n",
      "| Mat|      3|   16|And Jesus, when h...|\n",
      "| Mat|      4|    1|Then was Jesus le...|\n",
      "| Mat|      4|    7|Jesus said unto h...|\n",
      "| Mat|      4|   10|Then saith Jesus ...|\n",
      "| Mat|      4|   12|Now when Jesus ha...|\n",
      "| Mat|      4|   17|From that time Je...|\n",
      "| Mat|      4|   18|And Jesus, walkin...|\n",
      "| Mat|      4|   23|And Jesus went ab...|\n",
      "| Mat|      7|   28|And it came to pa...|\n",
      "| Mat|      8|    3|And Jesus put for...|\n",
      "| Mat|      8|    4|And Jesus saith u...|\n",
      "| Mat|      8|    5|And when Jesus wa...|\n",
      "| Mat|      8|    7|And Jesus saith u...|\n",
      "| Mat|      8|   10|When Jesus heard ...|\n",
      "+----+-------+-----+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "jesusVerses = [book: string, chapter: int ... 2 more fields]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[book: string, chapter: int ... 2 more fields]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val jesusVerses = spark.sql(\"SELECT * FROM verses2 WHERE text LIKE '%Jesus%'\")\n",
    "println(\"Number of Jesus Verses: \"+jesusVerses.count())\n",
    "jesusVerses.show"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now work with JSON. This _requires_ each JSON \"document\" to be on a single line.\n",
    "Let's first right some JSON."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Saving 'verses' as a JSON file to output/json.\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "jsonDir = output/json\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "output/json"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val jsonDir = \"output/json\"\n",
    "println(s\"Saving 'verses' as a JSON file to $jsonDir.\")\n",
    "verses.write.json(jsonDir)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "... and read it back in."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-------+--------------------+-----+\n",
      "|book|chapter|                text|verse|\n",
      "+----+-------+--------------------+-----+\n",
      "| Psa|     68|The Lord gave the...|   11|\n",
      "| Psa|     68|Kings of armies d...|   12|\n",
      "| Psa|     68|Though ye have li...|   13|\n",
      "| Psa|     68|When the Almighty...|   14|\n",
      "| Psa|     68|The hill of God i...|   15|\n",
      "| Psa|     68|Why leap ye, ye h...|   16|\n",
      "| Psa|     68|The chariots of G...|   17|\n",
      "| Psa|     68|Thou hast ascende...|   18|\n",
      "| Psa|     68|Blessed be the Lo...|   19|\n",
      "| Psa|     68|He that is our Go...|   20|\n",
      "| Psa|     68|But God shall wou...|   21|\n",
      "| Psa|     68|The Lord said, I ...|   22|\n",
      "| Psa|     68|That thy foot may...|   23|\n",
      "| Psa|     68|They have seen th...|   24|\n",
      "| Psa|     68|The singers went ...|   25|\n",
      "| Psa|     68|Bless ye God in t...|   26|\n",
      "| Psa|     68|There is little B...|   27|\n",
      "| Psa|     68|Thy God hath comm...|   28|\n",
      "| Psa|     68|Because of thy te...|   29|\n",
      "| Psa|     68|Rebuke the compan...|   30|\n",
      "+----+-------+--------------------+-----+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "versesJSON = [book: string, chapter: bigint ... 2 more fields]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[book: string, chapter: bigint ... 2 more fields]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val versesJSON = spark.read.json(jsonDir)\n",
    "versesJSON.show"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that the book order isn't preserved."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
